package com.z.join

import org.apache.flink.api.common.functions.JoinFunction
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

/**
 * @Author wenz.ma
 * @Date 2021/10/29 14:31
 * @Desc socket 输入数据，flink加工成两个流，两个流join，输出数据
 */
object SocketJoinDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputStream = env.socketTextStream("server120", 9999)

    /**
     * 测试数据
     * stu,1001,张三
     * stu,1002,李四
     * project,10001,1001,语文,90
     * project,10002,1002,语文,99
     * project,10003,1001,数学,79
     * project,10004,1002,数学,120
     *
     */
    val resultStream = inputStream.filter(x => {
      x != null && !"".equals(x)
    })//filter算子，过滤为空数据
      .map(x => x)//map算子，未进行任何处理
      .process(new ProcessFunction[String, String]() {//[String,String] 表示输入输出均为String
        override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
          val strs = value.split(",") //根据逗号分割输入字符串
          strs(0) match {//匹配第一个值
            case "stu" => { //如果是stu，走这个逻辑
              //output侧输出流 不用遵循函数头定义的输出类型，可以是任意类型
              //定义侧输出流，类型为(Int,String)元组，id为stu，值为(Integer.parseInt(strs(1)), strs(2))
              ctx.output(new OutputTag[(Int, String)]("stu"), (Integer.parseInt(strs(1)), strs(2)))
            }
            case "project" => {//如果是project，走这个逻辑
              //output侧输出流 不用遵循函数头定义的输出类型，可以是任意类型
              //定义侧输出流，类型为(Int, Int, String, Int)元组，id为project，值为(Integer.parseInt(strs(1)), Integer.parseInt(strs(2)
              ctx.output(new OutputTag[(Int, Int, String, Int)]("project"), (Integer.parseInt(strs(1)), Integer.parseInt(strs(2)), strs(3), Integer.parseInt(strs(4))))
            }
          }
        }
      })
    //从流里获取侧输出流
    val stu = resultStream.getSideOutput(new OutputTag[(Int, String)]("stu"))
    val project = resultStream.getSideOutput(new OutputTag[(Int, Int, String, Int)]("project"))
    //两个流关联
    stu.join(project)
      .where(col => col._1)//第一个流的第一个值
      .equalTo(col => col._2)//等于第二个流的第二个值
      .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))//滚动窗口，处理时间，15秒一个窗口
      //关联上以后执行的操作，[输入A,输入B,输出]
      .apply(new JoinFunction[(Int, String), (Int, Int, String, Int), (Int, Int, String, String, Int)] {
        override def join(first: (Int, String), second: (Int, Int, String, Int)): (Int, Int, String, String, Int) = {
          (first._1, second._1, first._2, second._3, second._4)//最终返回值
        }
      }).print()//打印


    //保持阻塞进程，等待数据输入
    env.execute()
  }
}
